Companion notebook #1 of 4 of the paper
M.C.Soini et al., On the market displacement of incumbent grid-connected electricity storage by more efficient storage.
The notebook is published as a static html file and as a Jupyter notebook file; please find the notebook file here.
This notebook runs the single-storage SymEnergy model and compare the model's results with manually constructed results as shown in figure 4 of the paper.
Instructions
import sys
import pandas as pd
import numpy as np
import sympy as sp
import itertools
from tqdm import tqdm
from collections import namedtuple
import time
from bokeh.io import show, output_notebook
output_notebook(verbose=False)
sp.init_printing(pretty_print=False)
from symenergy import Model
from symenergy import Evaluator
from symenergy import logger
from symenergy.evaluator import plotting
from symenergy import cache_params
from symenergy import multiproc_params
cache_params['path'] = './symenergy_cache/single_storage'
logger.setLevel('WARNING')
# Additional assumption on the behavior of the system:
# g (dispatchable power plant) power production non-zero
# -> all positivity constraints of dispatchable
# power production are non-binding ("False")
constraint_filt = ('act_lb_g_pos_p_0nig == False and '
'act_lb_g_pos_p_1mor == False and '
'act_lb_g_pos_p_2day == False and '
'act_lb_g_pos_p_3eve == False'
)
m_pwr = Model(curtailment=False, nworkers=10, constraint_filt=constraint_filt)
m_pwr.add_slot(name='0nig', load=10000, weight=6)
m_pwr.add_slot(name='1mor', load=10000, weight=6)
m_pwr.add_slot(name='2day', load=10000, weight=6)
m_pwr.add_slot(name='3eve', load=10000, weight=6)
m_pwr.add_plant(name='g', vc0=0, vc1=1)
m_pwr.add_storage(name='phs', eff=0.99, capacity=1, energy_cost=1e-12)
m_pwr.generate_solve()
Evaluate the model for selected numerical parameter values and plot the results.
logger.setLevel('WARNING')
multiproc_params['nworkers'] = 'default'
x_vals = {
m_pwr.slots['0nig'].l: np.linspace(3000, 10000, 7),
m_pwr.slots['1mor'].l: np.linspace(3000, 10000, 7),
m_pwr.slots['2day'].l: np.linspace(3000, 10000, 7),
m_pwr.slots['3eve'].l: np.linspace(9000, 10000, 2),
m_pwr.comps['phs'].eff: np.linspace(0.6, 0.999, 3),
m_pwr.comps['phs'].C: np.linspace(0, 1000, 3),
}
ev = Evaluator(m_pwr, x_vals, drop_non_optimum=True)
ev.get_evaluated_lambdas_parallel()
multiproc_params['nworkers'] = 1
ev.expand_to_x_vals_parallel()
balplot = plotting.BalancePlot(ev, ind_axx='C_phs_none', ind_pltx='slot', ind_plty=None, plot_width=300)
show(balplot._get_layout())
logger.setLevel('WARNING')
multiproc_params['nworkers'] = 'default'
# g (dispatchable power plant) power production non-zero -> all g positivity constraint non-binding ("False")
constraint_filt = ('act_lb_g_pos_p_0nig == False and '
'act_lb_g_pos_p_1mor == False and '
'act_lb_g_pos_p_2day == False and '
'act_lb_g_pos_p_3eve == False')
m_erg = Model(curtailment=False, nworkers=10, constraint_filt=constraint_filt)
m_erg.add_slot(name='0nig', load=10000, weight=6)
m_erg.add_slot(name='1mor', load=10000, weight=6)
m_erg.add_slot(name='2day', load=10000, weight=6)
m_erg.add_slot(name='3eve', load=10000, weight=6)
m_erg.add_plant(name='g', vc0=0, vc1=1)
m_erg.add_storage(name='phs', eff=0.99, energy_capacity=1, energy_cost=1e-12, charging_to_energy_factor='eta')
m_erg.generate_solve()
Evaluate the model for certain numerical parameter values and plot the results.
logger.setLevel('WARNING')
multiproc_params['nworkers'] = 'default'
x_vals = {
m_erg.slots['0nig'].l: np.linspace(3000, 10000, 7),
m_erg.slots['1mor'].l: np.linspace(3000, 10000, 7),
m_erg.slots['2day'].l: np.linspace(3000, 10000, 7),
m_erg.slots['3eve'].l: np.linspace(9000, 10000, 2),
m_erg.comps['phs'].eff: [0.9],
m_erg.comps['phs'].E: np.linspace(0, 6000, 4),
}
ev = Evaluator(m_erg, x_vals, drop_non_optimum=False)
ev.get_evaluated_lambdas_parallel()
multiproc_params['nworkers'] = 1
ev.expand_to_x_vals_parallel()
balplot = plotting.BalancePlot(ev, ind_axx='E_phs_none', ind_pltx='slot', ind_plty=None, plot_width=300)
show(balplot())
Verification of the blueprint in the upper part of figure 4 by comparison to solutions obtained from the Lagrange approach.
The following cells define various auxiliary functions for the manual construction of the results.
tolist = lambda lst: '(%s)' % ', '.join(lst)
def get_operation(x, model):
''' Get strings encoding operational patterns. '''
batphs = 'phs'
slots = model.slots
is_E_capacity_constrained = 'E_phs_none' in model.comps['phs'].parameters('name')
is_C_capacity_constrained = 'C_phs_none' in model.comps['phs'].parameters('name')
list_c = ([False if x[f'act_lb_{batphs}_pos_pchg_{slot}'] else 'c' for slot in slots])
list_d = ([False if x[f'act_lb_{batphs}_pos_pdch_{slot}'] else 'd' for slot in slots])
if is_C_capacity_constrained:
list_c = [c.upper() if x[f'act_lb_{batphs}_pchg_cap_C_{slot}'] and c else c for slot, c in zip(slots, list_c)]
list_d = [d.upper() if x[f'act_lb_{batphs}_pdch_cap_C_{slot}'] and d else d for slot, d in zip(slots, list_d)]
list_cd = ['-' if not c and not d else (c if c else d) for c, d in zip(list_c, list_d)]
list_eC = (['E' if is_E_capacity_constrained and x[f'act_lb_{batphs}_e_cap_E_{slot}'] else False for slot in slots])
list_e0 = (['0' if x[f'act_lb_{batphs}_pos_e_{slot}'] else False for slot in slots])
list_e = ['-' if not c and not d else (c if c else d) for c, d in zip(list_eC, list_e0)]
idling = list_cd == ['-'] * 4
name_batphs = batphs
return pd.Series({f'cd_pattern_{name_batphs}': ' ' + tolist(list_e) + ' '+ tolist(list_cd),
f'is_idling_{name_batphs}': idling})
def get_blocks(df, model):
'''
Get blocks of time slots separated by zero and/or capacity-constrained energy
'''
slots = model.slots
is_E_capacity_constrained = 'E_phs_none' in model.comps['phs'].parameters('name')
if is_E_capacity_constrained:
list_cap_E = df[[f'act_lb_phs_e_cap_E_{slot}' for slot in slots]].astype(int).tolist()
else:
list_cap_E = [0, 0, 0, 0]
list_pos_E = df[[f'act_lb_phs_pos_e_{slot}' for slot in slots]].astype(int).tolist()
if sum(list_cap_E) == 0 and sum(list_pos_E) == 1:
return [0] * 4
df_blocks = pd.DataFrame({'act_cap_E': list_cap_E, 'act_pos_E': list_pos_E, 'slot_id': range(4)})
df_blocks = pd.concat([df_blocks.iloc[[-1], :],
df_blocks])
df_blocks['diff_cap'] = df_blocks.act_cap_E.diff()
df_blocks['diff_pos'] = df_blocks.act_pos_E.diff()
if df_blocks.act_cap_E.any() and -1 in df_blocks['diff_cap'].values:
shift = np.where(df_blocks['diff_cap'].values == -1)[0][0] - 1
elif df_blocks.act_pos_E.sum() > 0 and -1 in df_blocks['diff_pos'].values:
shift = np.where(df_blocks['diff_pos'].values == -1)[0][0] - 1
else: shift = 0
df_blocks = df_blocks.iloc[1:]
df_blocks['slot_id_shift'] = np.roll(df_blocks.slot_id, int(shift))
df_blocks = df_blocks.sort_values('slot_id_shift')
df_blocks = pd.concat([df_blocks.iloc[[-1], :],
df_blocks])
df_blocks['diff_cap'] = df_blocks.act_cap_E.diff()
df_blocks['diff_pos'] = df_blocks.act_pos_E.diff()
block = [0] *4
iblock = 0
is_discharging = True
for i, (cap, pos, last_cap, last_pos) in enumerate(list(zip(df_blocks.act_cap_E, df_blocks.act_pos_E,
df_blocks.act_cap_E.shift(1), df_blocks.act_pos_E.shift(1)))[1:]):
if last_pos == 1:
is_discharging = False
if last_cap == 1:
is_discharging = True
start_after_cap = cap == 0 and last_cap == 1
start_after_pos = pos == 0 and last_pos == 1
stays_full = cap == 1 and last_cap == 1
stays_empty = pos == 1 and last_pos == 1
if start_after_cap or start_after_pos or stays_full or stays_empty:
iblock += 1
block[i] = iblock * (10 if is_discharging else 1)
return np.roll(block, shift)
Various functions to get the components as shown in figure 4 of the paper ("unconstrained operation drivers", "power constraint terms", etc)
def get_single_N(this_charging, other_charging, this_slot, other_slot):
other_w = getattr(w, other_slot)
this_l = getattr(l, this_slot)
other_l = getattr(l, other_slot)
return {(True, True): other_w * eff**2 * (other_l - this_l),
(True, False): other_w * (eff * other_l - this_l),
(False, True): other_w * eff * (other_l - this_l * eff),
(False, False): other_w * (other_l - this_l),
}[(this_charging, other_charging)]
def get_single_D(other_charging, other_slot):
other_w = getattr(w, other_slot)
return {(True): other_w * eff**2,
(False): other_w
}[other_charging]
def get_single_N_C(this_charging, other_charging, other_slot):
other_w = getattr(w, other_slot)
return {(True, True): - other_w * C * eff**2,
(True, False): + other_w * C * eff,
(False, True): - other_w * C * eff,
(False, False): + other_w * C,
}[(this_charging, other_charging)]
def get_N_E(this_charging, is_charging_block):
return {(True, True): E * eff,
(True, False): E,
(False, True): - E * eff,
(False, False): - E,
}[(is_charging_block, this_charging)]
def get_sign(this_charging):
return {True: +1, False: -1}[this_charging]
def get_N_unconstr(is_charging, slots, slct_idx, same_block_indices, is_active, is_power_cap_constr):
return sum(get_single_N(this_charging=is_charging[slct_idx],
other_charging=is_charging[other_idx],
this_slot=slots[slct_idx],
other_slot=slots[other_idx])
for other_idx in same_block_indices
if is_active[other_idx] and not is_power_cap_constr[other_idx])
def get_D(is_charging, slots, same_block_indices, is_active, is_power_cap_constr):
return sum(get_single_D(other_charging=is_charging[other_idx],
other_slot=slots[other_idx])
for other_idx in same_block_indices
if is_active[other_idx] and not is_power_cap_constr[other_idx])
def get_N_C(is_charging, slots, slct_idx, same_block_indices, is_active, is_power_cap_constr):
return sum(get_single_N_C(this_charging=is_charging[slct_idx],
other_charging=is_charging[other_idx],
other_slot=slots[other_idx])
for other_idx in same_block_indices
if is_active[other_idx] and is_power_cap_constr[other_idx])
compare_row function¶For each row (constraint combination) of the solution table, construct the result manually and compare with the model's result.
def compare_row(row, model):
is_E_capacity_constrained = 'E_phs_none' in model.comps['phs'].parameters('name')
is_C_capacity_constrained = 'C_phs_none' in model.comps['phs'].parameters('name')
slots = [slot[1:] for slot in model.slots]
nslots = list(model.slots)
if row.is_idling_phs:
return
is_charging = (1 - row[[r for r in row.index if 'pos_pchg' in r]].astype(int)).values
is_discharging = (1 - row[[r for r in row.index if 'pos_pdch' in r]].astype(int)).values
is_cap_E_constrained = (row[[r for r in row.index if 'cap_E' in r]].astype(int).values
if is_E_capacity_constrained
else np.zeros(4))
is_power_cap_constr = (np.array([row[f'act_lb_phs_pchg_cap_C_{slot}'] or row[f'act_lb_phs_pdch_cap_C_{slot}'] for slot in model.slots])
if is_C_capacity_constrained
else np.zeros(4))
is_zero_E = row[[r for r in row.index if 'pos_e' in r]].astype(int).values
is_active = np.array([(a or b) for a, b in zip(is_charging, is_discharging)])
block_indices = np.array(row.blocks)
(slct_indices,) = np.where(is_active)
any_binding_E = row[[r for r in row.index if 'cap_E' in r]].sum() > 0
# loop over slots with active storage power
for slct_idx in slct_indices:
(same_block_indices,) = np.where(np.array(block_indices) == block_indices[slct_idx])
is_charging_block = block_indices[same_block_indices][0] < 10
is_incomplete_block = False
# last index *before* the block and last index of the block
dict_prev_last_ind = {
(0, 1): (3, 1), (1, 2): (0, 2), (2, 3): (1, 3), (3, 0): (2, 0),
(0, 1, 2): (3, 2), (1, 2, 3): (0, 3), (2, 3, 0): (1, 0), (3, 0, 1): (2, 1)}
dict_prev_last_ind = {tuple(set(k)): v for k, v in dict_prev_last_ind.items()}
ind_before_block, last_ind = (dict_prev_last_ind[tuple(set(same_block_indices))]
if tuple(set(same_block_indices)) in dict_prev_last_ind
else (None, None))
# partial discharging/charging between two zero E/two capacity constrained slots
if ind_before_block is not None:
is_incomplete_block = (len(same_block_indices) > 1
and ((is_cap_E_constrained[ind_before_block] and is_cap_E_constrained[last_ind])
or (is_zero_E[ind_before_block] and is_zero_E[last_ind])))
# get constructed solution
if not is_power_cap_constr[slct_idx]:
N = get_N_unconstr(is_charging, slots, slct_idx, same_block_indices, is_active, is_power_cap_constr)
N += get_N_C(is_charging, slots, slct_idx, same_block_indices, is_active, is_power_cap_constr)
if any_binding_E and not is_incomplete_block:
N += get_N_E(this_charging=is_charging[slct_idx], is_charging_block=is_charging_block)
D = get_D(is_charging, slots, same_block_indices, is_active, is_power_cap_constr)
sgn = get_sign(this_charging=is_charging[slct_idx])
result_constr = sgn * N / D
else:
result_constr = C
# get solution from Lagrange approach
var_name = f'phs_p{"chg" if is_charging[slct_idx] else "dch"}_{nslots[slct_idx]}'
result_expect = model.get_results_dict(idx=row.idx, slct_var_mlt=[var_name], substitute={'ec_phs_none': 0})[var_name]
dict_subs_powers = {2.0: 2, eff**1.0: eff, 3.0: 3, 4.0: 4, 5.0: 5, 6.0: 6, 7.0: 7, 8.0: 8}
result_expect = sp.simplify(sp.simplify(result_expect.subs(dict_subs_powers)).subs(dict_subs_powers))
# compare solutions;
if sp.simplify(result_expect - result_constr) != 0:
print('idx: ', row.idx, 'slot:', slct_idx)
print('EXPECT', result_expect)
print('CONSTR', result_constr)
print('slct_idx', slct_idx, slots)
print('is_power_cap_constr', is_power_cap_constr)
print('cd_pattern_phs', row.cd_pattern_phs)
raise ValueError('Solutions don\'t match')
def get_sympy_symbols(model):
'''Retrieve the sympy symbols from the model instance
required to construct the model solutions.'''
slots = np.array(['nig', 'mor', 'day', 'eve'])
nslots = list(map(''.join, zip(map(str, range(4)), slots)))
L = namedtuple('L', slots)
W = namedtuple('W', slots)
# extract symbols from model parameters
(vc1_g_none,
l_0nig, w_0nig, l_1mor, w_1mor,
l_2day, w_2day, l_3eve, w_3eve,
ec_phs_none, eff_phs_none,
E_phs_none, vre_scale_none
) = model.parameters('symb')
# aggregate for convenience
l = L(l_0nig, l_1mor, l_2day, l_3eve)
w = W(w_0nig, w_1mor, w_2day, w_3eve)
eff = eff_phs_none
E = E_phs_none
return l, w, eff, E
Applied the compare_row function to each row of the result DataFrame.
If no errors are thrown, all constructed results are identical to the ones obtained from the model.get_results_dict method.
for model, pwrerg in [(m_erg, 'energy'), (m_pwr, 'power ')]:
l, w, eff, E = get_sympy_symbols(model)
C = E
df_comb = pd.concat([model.df_comb, model.df_comb.apply(get_operation, model=model, axis=1)], axis=1)
tqdm.pandas(desc=f'Get blocks for model with {pwrerg} constraints ')
df_comb['blocks'] = df_comb.progress_apply(get_blocks, model=model, axis=1)
tqdm.pandas(desc=f'Compare solutions of model with {pwrerg} constraints')
df_comb.progress_apply(compare_row, model=model, axis=1)